-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-51008][SQL] Add ResultStage for AQE #49715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
cc @ulysses-you |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this pr is just one of the stage level feature
prs ?
@ulysses-you yes, after this PR, we can implement the proposed idea in #44013 (comment) and keep contexts in the AQE query stage. |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Outdated
Show resolved
Hide resolved
…e/AdaptiveSparkPlanExec.scala Co-authored-by: Wenchen Fan <[email protected]>
…e/AdaptiveSparkPlanExec.scala Co-authored-by: Wenchen Fan <[email protected]>
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for some style nits
shall we ignore |
I think we already did it for all query stages. @liuzqt how did you see result query stage in the UI? |
} | ||
|
||
// Result stage could be any SparkPlan, so we don't have a specific runtime statistics for it. | ||
override def getRuntimeStatistics: Statistics = Statistics(sizeInBytes = 0, rowCount = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned about sizeInBytes = 0
, the Spark dummy statistics in other code place is Long.MaxValue
. Shall we use Statistics.DUMMY
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, use Statistics.DUMMY
instead.
I think we need to explicitly match the name to ignore it (updated in this commit) |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
…e/AdaptiveSparkPlanExec.scala
thanks, merging to master/4.0! |
### What changes were proposed in this pull request? Added ResultQueryStageExec for AQE How does the query plan look like in explain string: ``` AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 2 ------> newly added +- *(5) Project [id#26L] +- *(5) SortMergeJoin [id#26L], [id#27L], Inner :- *(3) Sort [id#26L ASC NULLS FIRST], false, 0 : +- AQEShuffleRead coalesced : +- ShuffleQueryStage 0 : +- Exchange hashpartitioning(id#26L, 200), ENSURE_REQUIREMENTS, [plan_id=247] : +- *(1) Range (0, 25600, step=1, splits=10) +- *(4) Sort [id#27L ASC NULLS FIRST], false, 0 +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(id#27L, 200), ENSURE_REQUIREMENTS, [plan_id=257] +- *(2) Ran... ``` How does the query plan look like in Spark UI: <img width="680" alt="Screenshot 2025-02-03 at 4 11 43 PM" src="https://github.com/user-attachments/assets/86946e19-ffdd-42dd-974a-62a8300ddac8" /> ### Why are the changes needed? Currently AQE framework is not fully self-contained since not all plan segments can be put into a query stage: the final "stage" basically executed as a nonAQE plan. This PR added a result query stage for AQE to unify the framework. With this change, we can build more query stage level features, one use case like #44013 (comment) ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? new unit tests. Also exisiting tests which are impacted by this change are updated to keep their original test semantics. ### Was this patch authored or co-authored using generative AI tooling? NO Closes #49715 from liuzqt/SPARK-51008. Lead-authored-by: liuzqt <[email protected]> Co-authored-by: Ziqi Liu <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 207390b) Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Added ResultQueryStageExec for AQE
How does the query plan look like in explain string:
No change Spark UI since we ignore
ResultQueryStage
just like we did for other query stages.Why are the changes needed?
Currently AQE framework is not fully self-contained since not all plan segments can be put into a query stage: the final "stage" basically executed as a nonAQE plan. This PR added a result query stage for AQE to unify the framework. With this change, we can build more query stage level features, one use case like #44013 (comment)
Does this PR introduce any user-facing change?
NO
How was this patch tested?
new unit tests.
Also exisiting tests which are impacted by this change are updated to keep their original test semantics.
Was this patch authored or co-authored using generative AI tooling?
NO